package com.whfc.flux.dao.impl;

import com.alibaba.fastjson.JSON;
import com.whfc.common.enums.DelFlag;
import com.whfc.common.result.PageData;
import com.whfc.common.result.PageVO;
import com.whfc.emp.constant.EmpMeasurement;
import com.whfc.emp.dto.AppDeviceCardLogCacheDTO;
import com.whfc.emp.dto.AppDeviceCardLogDTO;
import com.whfc.emp.entity.AppDeviceCardLog;
import com.whfc.flux.dao.AppDeviceCardLogDao;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBMapper;
import org.influxdb.querybuilder.Select;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.util.ObjectUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;

import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;

/**
 * @Description:
 * @author: xugcheng
 * @version: 1.0
 * @date: 2021-01-28 9:57
 */
@Repository
public class AppDeviceCardLogDaoImpl implements AppDeviceCardLogDao {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 数据库名称
     */
    private static final String database = EmpMeasurement.DATABASE;

    /**
     * 表名
     */
    private static final String measurement = EmpMeasurement.MEASUREMENT;

    /**
     * 保留策略:保存10年数据
     */
    private static final String retentionPolicy = EmpMeasurement.RETENTION_POLICY;

    /**
     * 时间单位:秒
     */
    private static final TimeUnit timeUnit = TimeUnit.SECONDS;

    /**
     * 默认查询字段
     */
    private static String[] COLUMNS = null;

    static {
        COLUMNS = new String[]{
                "empId", "deviceId", "time", "lng", "lat", "lngWgs84", "latWgs84", "location", "speed",
                "alarmValue", "alarmReport", "alarmSos", "alarmDrop", "alarmDoff", "alarmStill", "alarmCrash",
                "cardStateValue", "posType", "posState", "posMode", "batterVolt", "batteryPower", "serverTime"
        };
    }

    @Autowired
    private InfluxDB influxDB;

    @Autowired
    private InfluxDBMapper influxDBMapper;

    @Override
    public void insert(AppDeviceCardLog record) {
        logger.debug("influxdb的insert方法");
        influxDB.setDatabase(database);
        influxDB.setRetentionPolicy(retentionPolicy);
        Point point = this.buildDataPoint(record);
        logger.info("influxdb insert point:{}", point.toString());
        influxDB.write(point);
    }

    @Override
    public void batchInsert(List<AppDeviceCardLog> records) {
        logger.debug("influxdb的batchInsert方法");
        BatchPoints.Builder batchBuiler = BatchPoints.builder();
        for (AppDeviceCardLog record : records) {
            Point point = this.buildDataPoint(record);
            batchBuiler.point(point);
        }
        influxDB.setDatabase(database);
        influxDB.setRetentionPolicy(retentionPolicy);
        influxDB.write(batchBuiler.build());
    }

    @Override
    public List<AppDeviceCardLogDTO> selectHelmetDataLogListByEmpId(Integer empId, Date startTime, Date endTime) {
        logger.debug("influxdb的selectHelmetDataLogListByEmpId方法");
        String[] columns = COLUMNS;
        return this.selectHelmetDataLogListByEmpId(empId, startTime, endTime, columns);
    }

    @Override
    public PageVO<AppDeviceCardLog> selectHelmetDataLogListByEmpIdPage(Integer empId, Date startTime, Date endTime,Integer pagNum,Integer pageSize) {
        logger.debug("influxdb的selectHelmetDataLogListByEmpId方法");
        String[] columns = COLUMNS;

        return this.selectHelmetDataLogListByEmpIdPage(empId, startTime, endTime, columns,pagNum,pageSize);
    }

    @Override
    public List<AppDeviceCardLogDTO> selectHelmetDataLogListByEmpId(Integer empId, Date startTime, Date endTime, String[] columns) {
        logger.debug("influxdb的selectHelmetDataLogListByEmpId方法");
        Query query = select(columns).from(database, measurement)
                .where(eq("empId", String.valueOf(empId)))
                .and(gte("time", startTime.toInstant().toString()))
                .and(lte("time", endTime.toInstant().toString()))
                .orderBy(asc());
        logger.info(query.getCommand());
        List<AppDeviceCardLogDTO> list = influxDBMapper.query(query, AppDeviceCardLogDTO.class);
        this.transformTime(list);
        return list;
    }

    @Override
    public PageVO<AppDeviceCardLog> selectHelmetDataLogListByEmpIdPage(Integer empId, Date startTime, Date endTime, String[] columns,Integer pageNum,Integer pageSize) {
        logger.debug("influxdb的selectHelmetDataLogListByEmpId方法");
        //统计总数量
        Query countQuery = select().count("serverTime")
                .from(database, measurement)
                .where(eq("empId", String.valueOf(empId)))
                .and(gte("time", startTime.toInstant().toString()))
                .and(lte("time", endTime.toInstant().toString()));

        logger.info(countQuery.getCommand());
        QueryResult queryResult = influxDB.query(countQuery, TimeUnit.MILLISECONDS);
        Map<String, Object> resultMap = this.parseQueryResultMap(queryResult);
        int total = Double.valueOf(resultMap.getOrDefault("count", 0).toString()).intValue();
        logger.info("数量总数量total={}", total);

        //分页查询
        int pages = (total / pageSize) + ((total % pageSize) == 0 ? 0 : 1);
        int offset = (pageNum - 1) * pageSize;
        int limit = pageSize;
        List<AppDeviceCardLogDTO> list = Collections.EMPTY_LIST;
        if (pageNum <= pages) {
            Select select = select(columns).from(database, measurement)
                    .where(eq("empId", String.valueOf(empId)))
                    .and(eq("delFlag", DelFlag.UNDELETE.getValue()))
                    .andNested()
                    .and(gte("time", startTime.toInstant().toString()))
                    .and(lte("time", endTime.toInstant().toString()))
                    .close()
                    .orderBy(desc());
            Query query = offset == 0 ? select.limit(limit) : select.limit(limit, offset);
            logger.info(query.getCommand());
            list = influxDBMapper.query(query, AppDeviceCardLogDTO.class);
            this.transformTime(list);
        }

        PageVO<AppDeviceCardLog> pageData = new PageVO<>();
        List<AppDeviceCardLog> appDeviceCardLogCacheDTOS = toAppDeviceCardLogCacheDTO(list);
        pageData.setList(appDeviceCardLogCacheDTOS);
        pageData.setTotal((long) total);
        pageData.setPages(pages);
        pageData.setPageNum(pageNum);
        pageData.setPageSize(pageSize);

        return pageData;
    }

    @Override
    public PageData<AppDeviceCardLogDTO> selectHelmetDataLogListByDeviceId(Integer deviceId, Date startTime, Date endTime, Integer pageNum, Integer pageSize) {
        String[] columns = COLUMNS;
        return this.selectHelmetDataLogListByDeviceId(deviceId, startTime, endTime, pageNum, pageSize, columns);
    }

    @Override
    public PageData<AppDeviceCardLogDTO> selectHelmetDataLogListByDeviceId(Integer deviceId, Date startTime, Date endTime, Integer pageNum, Integer pageSize, String[] columns) {
        logger.debug("influxdb的selectHelmetDataLogListByDeviceId方法");

        //统计总数量
        Query countQuery = select().count("serverTime")
                .from(database, measurement)
                .where(eq("deviceId", String.valueOf(deviceId)))
                .and(gte("time", startTime.toInstant().toString()))
                .and(lte("time", endTime.toInstant().toString()));
        logger.info(countQuery.getCommand());
        QueryResult queryResult = influxDB.query(countQuery, TimeUnit.MILLISECONDS);
        Map<String, Object> resultMap = this.parseQueryResultMap(queryResult);
        int total = Double.valueOf(resultMap.getOrDefault("count", 0).toString()).intValue();
        logger.info("数量总数量total={}", total);

        //分页查询
        int pages = (total / pageSize) + ((total % pageSize) == 0 ? 0 : 1);
        int offset = (pageNum - 1) * pageSize;
        int limit = pageSize;
        List<AppDeviceCardLogDTO> list = Collections.EMPTY_LIST;
        if (pageNum <= pages) {
            Select select = select(columns).from(database, measurement)
                    .where(eq("deviceId", String.valueOf(deviceId)))
                    .and(eq("delFlag", DelFlag.UNDELETE.getValue()))
                    .andNested()
                    .and(gte("time", startTime.toInstant().toString()))
                    .and(lte("time", endTime.toInstant().toString()))
                    .close()
                    .orderBy(desc());
            Query query = offset == 0 ? select.limit(limit) : select.limit(limit, offset);
            logger.info(query.getCommand());
            list = influxDBMapper.query(query, AppDeviceCardLogDTO.class);
            this.transformTime(list);
        }

        PageData<AppDeviceCardLogDTO> pageData = new PageData<>();
        pageData.setList(list);
        pageData.setTotal((long) total);
        pageData.setPages(pages);
        pageData.setPageNum(pageNum);
        pageData.setPageSize(pageSize);
        return pageData;
    }

    @Override
    public List<AppDeviceCardLogDTO> selectHelmetDTOByEmpIds(List<Integer> empIds, Date startTime, Date endTime) {

        StringBuilder raw = new StringBuilder();
        raw.append("count(ts)");

        WhereQueryImpl where = select()
                .raw(raw.toString())
                .from(database, measurement)
                .where();
        WhereNested whereNested = where.andNested();
        for (Integer empId : empIds) {
            whereNested.or(eq("empId", String.valueOf(empId)));
        }
        whereNested.close();

        Query query = where
                .and(gte("time", startTime.toInstant().toString()))
                .and(lte("time", endTime.toInstant().toString()))
                .groupBy("empId", time(2L, "h"))
                .fill(0)
                .orderBy(desc());
        logger.info(query.getCommand());
        List<AppDeviceCardLogDTO> list = influxDBMapper.query(query, AppDeviceCardLogDTO.class);
        logger.debug("查询结果:{}", list);
        return list;
    }

    /**
     * 构建数据点
     *
     * @param record
     * @return
     */
    private Point buildDataPoint(AppDeviceCardLog record) {
        logger.debug("influxdb的buildDataPoint方法");
        Point.Builder builder = Point
                .measurement(measurement)
                .time(record.getTime().toInstant().getEpochSecond(), timeUnit);
        if (!ObjectUtils.isEmpty(record.getDeviceId())) {
            builder.tag("deviceId", String.valueOf(record.getDeviceId()));
        }
        if (!ObjectUtils.isEmpty(record.getEmpId())) {
            builder.tag("empId", String.valueOf(record.getEmpId()));
        }
        if (!ObjectUtils.isEmpty(record.getLng())) {
            builder.addField("lng", record.getLng());
        }
        if (!ObjectUtils.isEmpty(record.getLat())) {
            builder.addField("lat", record.getLat());
        }
        if (!ObjectUtils.isEmpty(record.getLngWgs84())) {
            builder.addField("lngWgs84", record.getLngWgs84());
        }
        if (!ObjectUtils.isEmpty(record.getLatWgs84())) {
            builder.addField("latWgs84", record.getLatWgs84());
        }
        if (!StringUtils.isEmpty(record.getLocation())) {
            builder.addField("location", record.getLocation());
        }
        if (!ObjectUtils.isEmpty(record.getBatteryPower())) {
            builder.addField("batterPower", record.getBatteryPower());
        }
        if (!ObjectUtils.isEmpty(record.getSpeed())) {
            builder.addField("speed", record.getSpeed());
        }
        if (!ObjectUtils.isEmpty(record.getRotation())) {
            builder.addField("rotation", record.getRotation());
        }
        if (!ObjectUtils.isEmpty(record.getVersion())) {
            builder.addField("version", record.getVersion());
        }
        if (!ObjectUtils.isEmpty(record.getGeoNum())) {
            builder.addField("geoNum", record.getGeoNum());
        }
        if (!ObjectUtils.isEmpty(record.getLevelFactor())) {
            builder.addField("levelFactor", record.getLevelFactor());
        }
        if (!ObjectUtils.isEmpty(record.getRssi())) {
            builder.addField("rssi", record.getRssi());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmValue())) {
            builder.addField("alarmValue", record.getAlarmValue());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmReport())) {
            builder.addField("alarmReport", record.getAlarmReport());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmSos())) {
            builder.addField("alarmSos", record.getAlarmSos());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmDrop())) {
            builder.addField("alarmDrop", record.getAlarmDrop());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmDoff())) {
            builder.addField("alarmDoff", record.getAlarmDoff());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmStill())) {
            builder.addField("alarmStill", record.getAlarmStill());
        }
        if (!ObjectUtils.isEmpty(record.getAlarmCrash())) {
            builder.addField("alarmCrash", record.getAlarmCrash());
        }
        if (!ObjectUtils.isEmpty(record.getCardStateValue())) {
            builder.addField("cardStateValue", record.getCardStateValue());
        }
        if (!ObjectUtils.isEmpty(record.getPosType())) {
            builder.addField("posType", record.getPosType());
        }
        if (!ObjectUtils.isEmpty(record.getPosState())) {
            builder.addField("posState", record.getPosState());
        }
        if (!ObjectUtils.isEmpty(record.getPosMode())) {
            builder.addField("posMode", record.getPosMode());
        }
        if (!ObjectUtils.isEmpty(record.getBatterVolt())) {
            builder.addField("batterVolt", record.getBatterVolt());
        }
        if (!ObjectUtils.isEmpty(record.getBatteryPower())) {
            builder.addField("batteryPower", record.getBatteryPower());
        }
        if (!ObjectUtils.isEmpty(record.getDelFlag())) {
            builder.addField("delFlag", record.getDelFlag());
        }
        builder.addField("serverTime", System.currentTimeMillis() / 1000);
        builder.addField("ts", record.getTime().getTime() / 1000);
        return builder.build();
    }

    /**
     * 时间类型转换
     *
     * @param list
     */
    private void transformTime(List<AppDeviceCardLogDTO> list) {
        for (AppDeviceCardLogDTO dto : list) {
            dto.setTime(Date.from(dto.getDeviceTime()));
            if (!ObjectUtils.isEmpty(dto.getServerTime())) {
                dto.setCreateTime(new Date(dto.getServerTime() * 1000));
            }
        }
    }

    /**
     * 解析查询结果
     *
     * @param queryResult
     * @return
     */
    private Map<String, Object> parseQueryResultMap(QueryResult queryResult) {
        Map<String, Object> map = null;
        if (queryResult != null) {
            List<QueryResult.Result> results = queryResult.getResults();
            if (results != null) {
                QueryResult.Result result = results.get(0);
                List<QueryResult.Series> seriesList = result.getSeries();
                if (seriesList != null && seriesList.size() > 0) {
                    List<String> columns = seriesList.get(0).getColumns();
                    List<List<Object>> values = seriesList.get(0).getValues();
                    logger.info("columns:{}", columns);
                    logger.info("values:{}", values);
                    if (values != null && values.size() > 0) {
                        List<Object> valueArr = values.get(0);
                        map = new HashMap<>(columns.size());
                        for (int i = 0; i < columns.size(); i++) {
                            String column = columns.get(i);
                            Object value = valueArr.get(i);
                            map.put(column, value);
                        }
                    }

                }
            }
        }
        if (map == null) {
            map = Collections.EMPTY_MAP;
        }
        return map;
    }

    /**
     * 解析查询结果
     *
     * @param queryResult
     * @return
     */
    private List<Map<String, Object>> parseQueryResultList(QueryResult queryResult) {
        List<Map<String, Object>> list = null;
        if (queryResult != null) {
            List<QueryResult.Result> results = queryResult.getResults();
            if (results != null && results.size() > 0) {
                QueryResult.Result result = results.get(0);
                List<QueryResult.Series> seriesList = result.getSeries();
                if (seriesList != null && seriesList.size() > 0) {
                    list = new ArrayList<>(seriesList.size());
                    for (int i = 0; i < seriesList.size(); i++) {

                        QueryResult.Series series = seriesList.get(i);
                        Map<String, String> tags = series.getTags();
                        List<String> columns = series.getColumns();
                        List<List<Object>> values = series.getValues();
                        Map<String, Object> map = new HashMap<>(columns.size() + tags.size());
                        logger.info("tags:{}", tags);
                        logger.info("columns:{}", columns);
                        logger.info("values:{}", values);
                        if (values != null && values.size() > 0) {
                            List<Object> valueArr = values.get(0);
                            for (int j = 0; j < columns.size(); j++) {
                                String column = columns.get(j);
                                Object value = valueArr.get(j);
                                map.put(column, value);
                            }
                        }
                        map.putAll(tags);
                        list.add(map);
                    }
                }
            }
        }
        if (list == null) {
            list = Collections.EMPTY_LIST;
        }
        return list;
    }

    private List<AppDeviceCardLog> toAppDeviceCardLogCacheDTO(List<AppDeviceCardLogDTO> logs) {
        List<AppDeviceCardLog> ret = new ArrayList<>();
        for (AppDeviceCardLogDTO log:logs) {
            AppDeviceCardLog cacheDTO = new AppDeviceCardLog();
            BeanUtils.copyProperties(log, cacheDTO);
            ret.add(cacheDTO);

        }
        return ret;
    }
}
